/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package cn.ac.iie.dpl.data.mmloader;

import cn.ac.iie.dpl.data.mmloader.globalParas.GlobalParas;
import cn.ac.iie.dpl.data.mmloader.hook.KillHandler;
import cn.ac.iie.dpl.data.mmloader.mq.RocketConsumer;
import cn.ac.iie.dpl.data.mmloader.mq.RocketProducer;
import cn.ac.iie.dpl.data.mmloader.worker.MMLoader;
import com.alibaba.rocketmq.client.exception.MQClientException;
import iie.mm.client.ClientAPI;
import iie.mm.client.ClientConf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

/**
 *
 * @author Li Mingyang
 */
public class DplDataMmloader {

    public static Logger logger = Logger.getLogger(DplDataMmloader.class.getName());
    public static Map<String, RocketConsumer> srcConsumerMap = new HashMap<String, RocketConsumer>();

    public static void main(String[] args) {
        PropertyConfigurator.configure("log4j.properties");
        //搞配置项
        try {
            GlobalParas.init();
        } catch (Exception ex) {
            logger.error("something wrong with globalParas " + ex.getMessage(), ex);
            System.exit(0);
        }
        //启动producer
        RocketProducer.init("dpl-mmloader", GlobalParas.nameServer);
        //搞多媒体
        try {
            List<Database> dbs = GlobalParas.metastoreCli.client.get_all_attributions();
            for (Database db : dbs) {
                String dbName = db.getName();
                String mmurl = (String) db.getParameters().get("mm.url");
                Map<String, ArrayList<ClientAPI>> schemaClientMap = new HashMap<String, ArrayList<ClientAPI>>();
                if ((mmurl != null) && (!"".equals(mmurl))) {
                    for (Map.Entry<String, Integer> entry : GlobalParas.srcSchemaThreadSizeMap.entrySet()) {
                        ArrayList<ClientAPI> clients = new ArrayList<ClientAPI>();
                        for (int i = 0; i < entry.getValue(); i++) {
                            ClientConf cc = new ClientConf();
                            ClientAPI ca = new ClientAPI(cc);
                            ca.init(mmurl);
                            clients.add(ca);
                        }
                        schemaClientMap.put(entry.getKey(), clients);
                    }
                    logger.info("the mm.url for " + dbName + " is " + mmurl);
                    GlobalParas.ClientAPIMap.put(dbName, schemaClientMap);
                } else {
                    logger.error("the mm.url for " + dbName + " is null or empty");
                    if (dbName.equalsIgnoreCase(GlobalParas.region)) {
                        System.exit(0);
                    }
                }
            }
        } catch (Exception ex) {
            logger.error(ex, ex);
            System.exit(0);
        }
        //启动处理线程
        for (String schemaName : GlobalParas.ClientAPIMap.get(GlobalParas.region).keySet()) {
            for (int index = 0; index < GlobalParas.ClientAPIMap.get(GlobalParas.region).get(schemaName).size(); index++) {
                MMLoader loader = new MMLoader();
                loader.init(schemaName, GlobalParas.srcSchemaQueneMap.get(schemaName),GlobalParas.ClientAPIMap.get(GlobalParas.region).get(schemaName).get(index));
                Thread senThread = new Thread(loader);
                senThread.setName(schemaName + "-" + index);
                senThread.start();
            }
        }
        //准备接收数据
        for (String schemaName : GlobalParas.srcSchemaName) {
            srcConsumerMap.put(schemaName, new RocketConsumer(schemaName, GlobalParas.srcSchemaQueneMap.get(schemaName)));
        }
        //启动接收数据
        try {
            for (Map.Entry<String, RocketConsumer> entry : srcConsumerMap.entrySet()) {
                entry.getValue().stratUp();//从srctopic取出数据解析成record放到RocketConsumer的Queue里
            }
        } catch (MQClientException ex) {
            logger.error(ex, ex);
        }
        //准备杀死
        KillHandler killhandle = new KillHandler();
        killhandle.registerSignal("TERM");
        logger.info("init the kill handler done ");
        do {
            try {
                Thread.sleep(20000L);
            } catch (Exception ex) {
            }
            logger.info("check the system is should exit ... \n");
        } while (!GlobalParas.isShouldExit.get());
        //关闭数据接收
        for (Map.Entry<String, RocketConsumer> entry : srcConsumerMap.entrySet()) {
            entry.getValue().stop();
        }
        logger.info("stop the consumer ok ");
        int count = 0;
        while (true) {
            try {
                Thread.sleep(5L);
            } catch (Exception ex) {
            }
            count++;
            for (Map.Entry<String, LinkedBlockingQueue<GenericRecord>> entry : GlobalParas.srcSchemaQueneMap.entrySet()) {
                if (!entry.getValue().isEmpty()) {
                    count = 0;
                }
            }
            if (count >= 2000) {
                logger.info("now the system will exit safely .... \n\n");
                System.exit(0);
            }
        }
    }
}
